package com.atguigu.gmall.realtime.app.dws;

import com.atguigu.gmall.realtime.bean.KeywordBean;
import com.atguigu.gmall.realtime.func.KeyWordUDTF;
import com.atguigu.gmall.realtime.util.ClickHouseUtil;
import com.atguigu.gmall.realtime.util.GmallConstant;
import com.atguigu.gmall.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author caodan
 * @version 1.0
 * @date 2022-10-09 13:47
 * 流量域来源关键词粒度页面浏览各窗口轻度聚合
 */
public class DwsTrafficSourceKeywordPageViewWindow {

    public static void main(String[] args) throws Exception {

        // 创建流实时环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));
        tableEnv.createTemporarySystemFunction("ik_analyze", KeyWordUDTF.class);
        // 设置检查点相关
        /*env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(30),Time.seconds(3)));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs:hadoop102:8020/dwsTrafficSourceKeywordPageViewWindow");
        System.setProperty("HADOOP_USER_NAME","atguigu");*/

        // 从kafka中dwd_traffic_page_log中读取数据
        String topic = "dwd_traffic_page_log";
        String groupId = "dws_traffic_source_keyword_page_view_window";
        tableEnv.executeSql("create table page_log(\n" +
                "`common` map<string, string>,\n" +
                "`page` map<string, string>,\n" +
                "`ts` bigint,\n" +
                "row_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss')),\n" +
                "WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND\n" +
                ")" + KafkaUtil.getKafkaDDLPorps(topic, groupId));
        // 从日志中过滤符合条件数据
        Table searchTable = tableEnv.sqlQuery("select\n" +
                "page['item'] full_word,\n" +
                "row_time\n" +
                "from page_log\n" +
                "where page['item'] is not null\n" +
                "and page['last_page_id'] = 'search'\n" +
                "and page['item_type'] = 'keyword'");
        tableEnv.createTemporaryView("search_table", searchTable);
        // 使用自定义函数进行分词并和所有字段进行关联
        Table splitTable = tableEnv.sqlQuery("select\n" +
                "keyword,\n" +
                "row_time \n" +
                "from search_table,\n" +
                "lateral table(ik_analyze(full_word))\n" +
                "as t(keyword)");
        tableEnv.createTemporaryView("split_table", splitTable);
        // 分组，开窗，聚合
        Table KeywordBeanSearch = tableEnv.sqlQuery("select\n" +
                "DATE_FORMAT(TUMBLE_START(row_time, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "DATE_FORMAT(TUMBLE_END(row_time, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') edt,\n'" +
                 GmallConstant.KEYWORD_SEARCH + "' source,\n" +
                "keyword,\n" +
                "count(*) keyword_count,\n" +
                "UNIX_TIMESTAMP() * 1000 ts\n" +
                "from split_table\n" +
                "GROUP BY TUMBLE(row_time, INTERVAL '10' SECOND),keyword");

        tableEnv.createTemporaryView("key_word_search", KeywordBeanSearch);

        // tableEnv.executeSql("select * from key_word_search").print();

        // 将结果转换为流
        DataStream<KeywordBean> keywordBeanDataStream = tableEnv.toAppendStream(KeywordBeanSearch, KeywordBean.class);

        // 将流中数据写入到clickhouse中
        keywordBeanDataStream.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_traffic_source_keyword_page_view_window values(?,?,?,?,?,?)")
        );

        env.execute();
    }
}
